package com.abyss.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
public class JoinDemo {
    public static void main(String[] args) throws Exception {
        // 1. Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 准备两份数据的source
        // 2.1 学科数据的source
        DataSource<Subject> subjectSource = env.readCsvFile("/Users/abyss/Dev/toys/flink/H-flink-learn/src/main/resources/subject.csv")
                .includeFields(true, true)
                .fieldDelimiter(",")
                .pojoType(Subject.class, "id", "name");
        // 2.2 学生分数数据的source
        DataSource<Score> scoreSource = env.readCsvFile("/Users/abyss/Dev/toys/flink/H-flink-learn/src/main/resources/score.csv")
                .includeFields(true, true, true, true)
                .fieldDelimiter(",")
                .pojoType(Score.class, "stuId", "stuName", "subjectId", "score");

        /*
        join 的使用分3步
        1. 调用Join
        2. 设置关联的列
        3. 设置关联的条件
         */
        JoinOperator.DefaultJoin<Subject, Score> joinedDataSet = subjectSource
                .join(scoreSource).where("id").equalTo("subjectId");

        MapOperator<Tuple2<Subject, Score>, JoinedScore> result = joinedDataSet.map(new MapFunction<Tuple2<Subject, Score>, JoinedScore>() {
            @Override
            public JoinedScore map(Tuple2<Subject, Score> value) throws Exception {
                return new JoinedScore(value.f1.stuId, value.f1.stuName, value.f0.name, value.f1.score);
            }
        });

        result.print();
    }

    public static class JoinedScore {
        private int stuId;
        private String stuName;
        private String subjectName;
        private double score;

        @Override
        public String toString() {
            return "JoinedScore{" +
                    "stuId=" + stuId +
                    ", stuName='" + stuName + '\'' +
                    ", subjectName='" + subjectName + '\'' +
                    ", score=" + score +
                    '}';
        }

        public int getStuId() {
            return stuId;
        }

        public void setStuId(int stuId) {
            this.stuId = stuId;
        }

        public String getStuName() {
            return stuName;
        }

        public void setStuName(String stuName) {
            this.stuName = stuName;
        }

        public String getSubjectName() {
            return subjectName;
        }

        public void setSubjectName(String subjectName) {
            this.subjectName = subjectName;
        }

        public double getScore() {
            return score;
        }

        public void setScore(double score) {
            this.score = score;
        }

        public JoinedScore() {}

        public JoinedScore(int stuId, String stuName, String subjectName, double score) {
            this.stuId = stuId;
            this.stuName = stuName;
            this.subjectName = subjectName;
            this.score = score;
        }
    }

    public static class Subject {
        private int id;
        private String name;

        public Subject(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public Subject() {}

        @Override
        public String toString() {
            return "Subject{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

    public static class Score {
        private int stuId;
        private String stuName;
        private int subjectId;
        private double score;

        public Score() {}

        @Override
        public String toString() {
            return "Score{" +
                    "stuId=" + stuId +
                    ", stuName='" + stuName + '\'' +
                    ", subjectId=" + subjectId +
                    ", score=" + score +
                    '}';
        }

        public int getStuId() {
            return stuId;
        }

        public void setStuId(int stuId) {
            this.stuId = stuId;
        }

        public String getStuName() {
            return stuName;
        }

        public void setStuName(String stuName) {
            this.stuName = stuName;
        }

        public int getSubjectId() {
            return subjectId;
        }

        public void setSubjectId(int subjectId) {
            this.subjectId = subjectId;
        }

        public double getScore() {
            return score;
        }

        public void setScore(double score) {
            this.score = score;
        }

        public Score(int stuId, String stuName, int subjectId, double score) {
            this.stuId = stuId;
            this.stuName = stuName;
            this.subjectId = subjectId;
            this.score = score;
        }
    }
}
